{
 "cells": [
  {
   "cell_type": "code",
   "execution_count": 1,
   "id": "8c1d0c08",
   "metadata": {},
   "outputs": [],
   "source": [
    "import pyspark\n",
    "from pyspark.sql import SparkSession"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "id": "96a248f5",
   "metadata": {},
   "outputs": [
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "WARNING: An illegal reflective access operation has occurred\n",
      "WARNING: Illegal reflective access by org.apache.spark.unsafe.Platform (file:/home/alexey/spark/spark-3.0.3-bin-hadoop3.2/jars/spark-unsafe_2.12-3.0.3.jar) to constructor java.nio.DirectByteBuffer(long,int)\n",
      "WARNING: Please consider reporting this to the maintainers of org.apache.spark.unsafe.Platform\n",
      "WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations\n",
      "WARNING: All illegal access operations will be denied in a future release\n",
      "22/02/17 21:59:14 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable\n",
      "Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties\n",
      "Setting default log level to \"WARN\".\n",
      "To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).\n"
     ]
    }
   ],
   "source": [
    "spark = SparkSession.builder \\\n",
    "    .master(\"local[*]\") \\\n",
    "    .appName('test') \\\n",
    "    .getOrCreate()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 7,
   "id": "c53274b1",
   "metadata": {},
   "outputs": [],
   "source": [
    "import pandas as pd"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 12,
   "id": "5d8434e1",
   "metadata": {},
   "outputs": [],
   "source": [
    "from pyspark.sql import types"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 18,
   "id": "a84c6c6d",
   "metadata": {},
   "outputs": [],
   "source": [
    "green_schema = types.StructType([\n",
    "    types.StructField(\"VendorID\", types.IntegerType(), True),\n",
    "    types.StructField(\"lpep_pickup_datetime\", types.TimestampType(), True),\n",
    "    types.StructField(\"lpep_dropoff_datetime\", types.TimestampType(), True),\n",
    "    types.StructField(\"store_and_fwd_flag\", types.StringType(), True),\n",
    "    types.StructField(\"RatecodeID\", types.IntegerType(), True),\n",
    "    types.StructField(\"PULocationID\", types.IntegerType(), True),\n",
    "    types.StructField(\"DOLocationID\", types.IntegerType(), True),\n",
    "    types.StructField(\"passenger_count\", types.IntegerType(), True),\n",
    "    types.StructField(\"trip_distance\", types.DoubleType(), True),\n",
    "    types.StructField(\"fare_amount\", types.DoubleType(), True),\n",
    "    types.StructField(\"extra\", types.DoubleType(), True),\n",
    "    types.StructField(\"mta_tax\", types.DoubleType(), True),\n",
    "    types.StructField(\"tip_amount\", types.DoubleType(), True),\n",
    "    types.StructField(\"tolls_amount\", types.DoubleType(), True),\n",
    "    types.StructField(\"ehail_fee\", types.DoubleType(), True),\n",
    "    types.StructField(\"improvement_surcharge\", types.DoubleType(), True),\n",
    "    types.StructField(\"total_amount\", types.DoubleType(), True),\n",
    "    types.StructField(\"payment_type\", types.IntegerType(), True),\n",
    "    types.StructField(\"trip_type\", types.IntegerType(), True),\n",
    "    types.StructField(\"congestion_surcharge\", types.DoubleType(), True)\n",
    "])\n",
    "\n",
    "yellow_schema = types.StructType([\n",
    "    types.StructField(\"VendorID\", types.IntegerType(), True),\n",
    "    types.StructField(\"tpep_pickup_datetime\", types.TimestampType(), True),\n",
    "    types.StructField(\"tpep_dropoff_datetime\", types.TimestampType(), True),\n",
    "    types.StructField(\"passenger_count\", types.IntegerType(), True),\n",
    "    types.StructField(\"trip_distance\", types.DoubleType(), True),\n",
    "    types.StructField(\"RatecodeID\", types.IntegerType(), True),\n",
    "    types.StructField(\"store_and_fwd_flag\", types.StringType(), True),\n",
    "    types.StructField(\"PULocationID\", types.IntegerType(), True),\n",
    "    types.StructField(\"DOLocationID\", types.IntegerType(), True),\n",
    "    types.StructField(\"payment_type\", types.IntegerType(), True),\n",
    "    types.StructField(\"fare_amount\", types.DoubleType(), True),\n",
    "    types.StructField(\"extra\", types.DoubleType(), True),\n",
    "    types.StructField(\"mta_tax\", types.DoubleType(), True),\n",
    "    types.StructField(\"tip_amount\", types.DoubleType(), True),\n",
    "    types.StructField(\"tolls_amount\", types.DoubleType(), True),\n",
    "    types.StructField(\"improvement_surcharge\", types.DoubleType(), True),\n",
    "    types.StructField(\"total_amount\", types.DoubleType(), True),\n",
    "    types.StructField(\"congestion_surcharge\", types.DoubleType(), True)\n",
    "])"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 27,
   "id": "3f7e0cb9",
   "metadata": {
    "scrolled": true
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "processing data for 2020/1\n"
     ]
    },
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "                                                                                \r"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "processing data for 2020/2\n"
     ]
    },
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "                                                                                \r"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "processing data for 2020/3\n"
     ]
    },
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "                                                                                \r"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "processing data for 2020/4\n"
     ]
    },
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "                                                                                \r"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "processing data for 2020/5\n"
     ]
    },
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "                                                                                \r"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "processing data for 2020/6\n"
     ]
    },
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "                                                                                \r"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "processing data for 2020/7\n"
     ]
    },
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "                                                                                \r"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "processing data for 2020/8\n"
     ]
    },
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "                                                                                \r"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "processing data for 2020/9\n"
     ]
    },
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "                                                                                \r"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "processing data for 2020/10\n"
     ]
    },
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "                                                                                \r"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "processing data for 2020/11\n"
     ]
    },
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "                                                                                \r"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "processing data for 2020/12\n"
     ]
    },
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "                                                                                \r"
     ]
    }
   ],
   "source": [
    "year = 2020\n",
    "\n",
    "for month in range(1, 13):\n",
    "    print(f'processing data for {year}/{month}')\n",
    "\n",
    "    input_path = f'data/raw/green/{year}/{month:02d}/'\n",
    "    output_path = f'data/pq/green/{year}/{month:02d}/'\n",
    "\n",
    "    df_green = spark.read \\\n",
    "        .option(\"header\", \"true\") \\\n",
    "        .schema(green_schema) \\\n",
    "        .csv(input_path)\n",
    "\n",
    "    df_green \\\n",
    "        .repartition(4) \\\n",
    "        .write.parquet(output_path)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 26,
   "id": "96ac2ad7",
   "metadata": {
    "scrolled": true
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "processing data for 2021/1\n"
     ]
    },
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "                                                                                \r"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "processing data for 2021/2\n"
     ]
    },
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "                                                                                \r"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "processing data for 2021/3\n"
     ]
    },
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "                                                                                \r"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "processing data for 2021/4\n"
     ]
    },
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "                                                                                \r"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "processing data for 2021/5\n"
     ]
    },
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "                                                                                \r"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "processing data for 2021/6\n"
     ]
    },
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "                                                                                \r"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "processing data for 2021/7\n"
     ]
    },
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "\r",
      "[Stage 15:>                                                         (0 + 1) / 1]\r"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "processing data for 2021/8\n"
     ]
    },
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "\r",
      "                                                                                \r"
     ]
    },
    {
     "ename": "AnalysisException",
     "evalue": "Path does not exist: file:/home/alexey/data-engineering-zoomcamp/week_5_batch_processing/code/data/raw/green/2021/08;",
     "output_type": "error",
     "traceback": [
      "\u001b[0;31m---------------------------------------------------------------------------\u001b[0m",
      "\u001b[0;31mAnalysisException\u001b[0m                         Traceback (most recent call last)",
      "\u001b[0;32m/tmp/ipykernel_129101/906373977.py\u001b[0m in \u001b[0;36m<module>\u001b[0;34m\u001b[0m\n\u001b[1;32m      7\u001b[0m     \u001b[0moutput_path\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0;34mf'data/pq/green/{year}/{month:02d}/'\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m      8\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m----> 9\u001b[0;31m     \u001b[0mdf_green\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mspark\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mread\u001b[0m\u001b[0;31m \u001b[0m\u001b[0;31m\\\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m     10\u001b[0m         \u001b[0;34m.\u001b[0m\u001b[0moption\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m\"header\"\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0;34m\"true\"\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;31m \u001b[0m\u001b[0;31m\\\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m     11\u001b[0m         \u001b[0;34m.\u001b[0m\u001b[0mschema\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mgreen_schema\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;31m \u001b[0m\u001b[0;31m\\\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n",
      "\u001b[0;32m~/spark/spark-3.0.3-bin-hadoop3.2/python/pyspark/sql/readwriter.py\u001b[0m in \u001b[0;36mcsv\u001b[0;34m(self, path, schema, sep, encoding, quote, escape, comment, header, inferSchema, ignoreLeadingWhiteSpace, ignoreTrailingWhiteSpace, nullValue, nanValue, positiveInf, negativeInf, dateFormat, timestampFormat, maxColumns, maxCharsPerColumn, maxMalformedLogPerPartition, mode, columnNameOfCorruptRecord, multiLine, charToEscapeQuoteEscaping, samplingRatio, enforceSchema, emptyValue, locale, lineSep, pathGlobFilter, recursiveFileLookup)\u001b[0m\n\u001b[1;32m    536\u001b[0m             \u001b[0mpath\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0;34m[\u001b[0m\u001b[0mpath\u001b[0m\u001b[0;34m]\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m    537\u001b[0m         \u001b[0;32mif\u001b[0m \u001b[0mtype\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mpath\u001b[0m\u001b[0;34m)\u001b[0m \u001b[0;34m==\u001b[0m \u001b[0mlist\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m--> 538\u001b[0;31m             \u001b[0;32mreturn\u001b[0m \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0m_df\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0m_jreader\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mcsv\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0m_spark\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0m_sc\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0m_jvm\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mPythonUtils\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mtoSeq\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mpath\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m    539\u001b[0m         \u001b[0;32melif\u001b[0m \u001b[0misinstance\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mpath\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mRDD\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m    540\u001b[0m             \u001b[0;32mdef\u001b[0m \u001b[0mfunc\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0miterator\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n",
      "\u001b[0;32m~/spark/spark-3.0.3-bin-hadoop3.2/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py\u001b[0m in \u001b[0;36m__call__\u001b[0;34m(self, *args)\u001b[0m\n\u001b[1;32m   1302\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m   1303\u001b[0m         \u001b[0manswer\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mgateway_client\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0msend_command\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mcommand\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m-> 1304\u001b[0;31m         return_value = get_return_value(\n\u001b[0m\u001b[1;32m   1305\u001b[0m             answer, self.gateway_client, self.target_id, self.name)\n\u001b[1;32m   1306\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n",
      "\u001b[0;32m~/spark/spark-3.0.3-bin-hadoop3.2/python/pyspark/sql/utils.py\u001b[0m in \u001b[0;36mdeco\u001b[0;34m(*a, **kw)\u001b[0m\n\u001b[1;32m    132\u001b[0m                 \u001b[0;31m# Hide where the exception came from that shows a non-Pythonic\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m    133\u001b[0m                 \u001b[0;31m# JVM exception message.\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m--> 134\u001b[0;31m                 \u001b[0mraise_from\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mconverted\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m    135\u001b[0m             \u001b[0;32melse\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m    136\u001b[0m                 \u001b[0;32mraise\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n",
      "\u001b[0;32m~/spark/spark-3.0.3-bin-hadoop3.2/python/pyspark/sql/utils.py\u001b[0m in \u001b[0;36mraise_from\u001b[0;34m(e)\u001b[0m\n",
      "\u001b[0;31mAnalysisException\u001b[0m: Path does not exist: file:/home/alexey/data-engineering-zoomcamp/week_5_batch_processing/code/data/raw/green/2021/08;"
     ]
    }
   ],
   "source": [
    "year = 2021 \n",
    "\n",
    "for month in range(1, 13):\n",
    "    print(f'processing data for {year}/{month}')\n",
    "\n",
    "    input_path = f'data/raw/green/{year}/{month:02d}/'\n",
    "    output_path = f'data/pq/green/{year}/{month:02d}/'\n",
    "\n",
    "    df_green = spark.read \\\n",
    "        .option(\"header\", \"true\") \\\n",
    "        .schema(green_schema) \\\n",
    "        .csv(input_path)\n",
    "\n",
    "    df_green \\\n",
    "        .repartition(4) \\\n",
    "        .write.parquet(output_path)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "463c7dc8",
   "metadata": {},
   "outputs": [],
   "source": []
  },
  {
   "cell_type": "code",
   "execution_count": 23,
   "id": "6ff4265d",
   "metadata": {},
   "outputs": [],
   "source": []
  },
  {
   "cell_type": "code",
   "execution_count": 24,
   "id": "6e982d29",
   "metadata": {},
   "outputs": [],
   "source": []
  },
  {
   "cell_type": "code",
   "execution_count": 28,
   "id": "19326bc9",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "processing data for 2020/1\n"
     ]
    },
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "                                                                                \r"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "processing data for 2020/2\n"
     ]
    },
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "                                                                                \r"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "processing data for 2020/3\n"
     ]
    },
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "                                                                                \r"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "processing data for 2020/4\n"
     ]
    },
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "                                                                                \r"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "processing data for 2020/5\n"
     ]
    },
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "                                                                                \r"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "processing data for 2020/6\n"
     ]
    },
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "                                                                                \r"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "processing data for 2020/7\n"
     ]
    },
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "                                                                                \r"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "processing data for 2020/8\n"
     ]
    },
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "                                                                                \r"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "processing data for 2020/9\n"
     ]
    },
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "                                                                                \r"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "processing data for 2020/10\n"
     ]
    },
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "                                                                                \r"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "processing data for 2020/11\n"
     ]
    },
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "                                                                                \r"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "processing data for 2020/12\n"
     ]
    },
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "                                                                                \r"
     ]
    }
   ],
   "source": [
    "year = 2020\n",
    "\n",
    "for month in range(1, 13):\n",
    "    print(f'processing data for {year}/{month}')\n",
    "\n",
    "    input_path = f'data/raw/yellow/{year}/{month:02d}/'\n",
    "    output_path = f'data/pq/yellow/{year}/{month:02d}/'\n",
    "\n",
    "    df_yellow = spark.read \\\n",
    "        .option(\"header\", \"true\") \\\n",
    "        .schema(yellow_schema) \\\n",
    "        .csv(input_path)\n",
    "\n",
    "    df_yellow \\\n",
    "        .repartition(4) \\\n",
    "        .write.parquet(output_path)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 29,
   "id": "aeca811a",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "processing data for 2021/1\n"
     ]
    },
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "                                                                                \r"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "processing data for 2021/2\n"
     ]
    },
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "                                                                                \r"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "processing data for 2021/3\n"
     ]
    },
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "                                                                                \r"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "processing data for 2021/4\n"
     ]
    },
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "                                                                                \r"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "processing data for 2021/5\n"
     ]
    },
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "                                                                                \r"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "processing data for 2021/6\n"
     ]
    },
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "                                                                                \r"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "processing data for 2021/7\n"
     ]
    },
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "[Stage 78:===========================================>              (3 + 1) / 4]\r"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "processing data for 2021/8\n"
     ]
    },
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "\r",
      "                                                                                \r"
     ]
    },
    {
     "ename": "AnalysisException",
     "evalue": "Path does not exist: file:/home/alexey/data-engineering-zoomcamp/week_5_batch_processing/code/data/raw/yellow/2021/08;",
     "output_type": "error",
     "traceback": [
      "\u001b[0;31m---------------------------------------------------------------------------\u001b[0m",
      "\u001b[0;31mAnalysisException\u001b[0m                         Traceback (most recent call last)",
      "\u001b[0;32m/tmp/ipykernel_129101/2088663510.py\u001b[0m in \u001b[0;36m<module>\u001b[0;34m\u001b[0m\n\u001b[1;32m      7\u001b[0m     \u001b[0moutput_path\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0;34mf'data/pq/yellow/{year}/{month:02d}/'\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m      8\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m----> 9\u001b[0;31m     \u001b[0mdf_yellow\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mspark\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mread\u001b[0m\u001b[0;31m \u001b[0m\u001b[0;31m\\\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m     10\u001b[0m         \u001b[0;34m.\u001b[0m\u001b[0moption\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m\"header\"\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0;34m\"true\"\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;31m \u001b[0m\u001b[0;31m\\\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m     11\u001b[0m         \u001b[0;34m.\u001b[0m\u001b[0mschema\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0myellow_schema\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;31m \u001b[0m\u001b[0;31m\\\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n",
      "\u001b[0;32m~/spark/spark-3.0.3-bin-hadoop3.2/python/pyspark/sql/readwriter.py\u001b[0m in \u001b[0;36mcsv\u001b[0;34m(self, path, schema, sep, encoding, quote, escape, comment, header, inferSchema, ignoreLeadingWhiteSpace, ignoreTrailingWhiteSpace, nullValue, nanValue, positiveInf, negativeInf, dateFormat, timestampFormat, maxColumns, maxCharsPerColumn, maxMalformedLogPerPartition, mode, columnNameOfCorruptRecord, multiLine, charToEscapeQuoteEscaping, samplingRatio, enforceSchema, emptyValue, locale, lineSep, pathGlobFilter, recursiveFileLookup)\u001b[0m\n\u001b[1;32m    536\u001b[0m             \u001b[0mpath\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0;34m[\u001b[0m\u001b[0mpath\u001b[0m\u001b[0;34m]\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m    537\u001b[0m         \u001b[0;32mif\u001b[0m \u001b[0mtype\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mpath\u001b[0m\u001b[0;34m)\u001b[0m \u001b[0;34m==\u001b[0m \u001b[0mlist\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m--> 538\u001b[0;31m             \u001b[0;32mreturn\u001b[0m \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0m_df\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0m_jreader\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mcsv\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0m_spark\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0m_sc\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0m_jvm\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mPythonUtils\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mtoSeq\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mpath\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m    539\u001b[0m         \u001b[0;32melif\u001b[0m \u001b[0misinstance\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mpath\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mRDD\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m    540\u001b[0m             \u001b[0;32mdef\u001b[0m \u001b[0mfunc\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0miterator\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n",
      "\u001b[0;32m~/spark/spark-3.0.3-bin-hadoop3.2/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py\u001b[0m in \u001b[0;36m__call__\u001b[0;34m(self, *args)\u001b[0m\n\u001b[1;32m   1302\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m   1303\u001b[0m         \u001b[0manswer\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mgateway_client\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0msend_command\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mcommand\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m-> 1304\u001b[0;31m         return_value = get_return_value(\n\u001b[0m\u001b[1;32m   1305\u001b[0m             answer, self.gateway_client, self.target_id, self.name)\n\u001b[1;32m   1306\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n",
      "\u001b[0;32m~/spark/spark-3.0.3-bin-hadoop3.2/python/pyspark/sql/utils.py\u001b[0m in \u001b[0;36mdeco\u001b[0;34m(*a, **kw)\u001b[0m\n\u001b[1;32m    132\u001b[0m                 \u001b[0;31m# Hide where the exception came from that shows a non-Pythonic\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m    133\u001b[0m                 \u001b[0;31m# JVM exception message.\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m--> 134\u001b[0;31m                 \u001b[0mraise_from\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mconverted\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m    135\u001b[0m             \u001b[0;32melse\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m    136\u001b[0m                 \u001b[0;32mraise\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n",
      "\u001b[0;32m~/spark/spark-3.0.3-bin-hadoop3.2/python/pyspark/sql/utils.py\u001b[0m in \u001b[0;36mraise_from\u001b[0;34m(e)\u001b[0m\n",
      "\u001b[0;31mAnalysisException\u001b[0m: Path does not exist: file:/home/alexey/data-engineering-zoomcamp/week_5_batch_processing/code/data/raw/yellow/2021/08;"
     ]
    }
   ],
   "source": [
    "year = 2021\n",
    "\n",
    "for month in range(1, 13):\n",
    "    print(f'processing data for {year}/{month}')\n",
    "\n",
    "    input_path = f'data/raw/yellow/{year}/{month:02d}/'\n",
    "    output_path = f'data/pq/yellow/{year}/{month:02d}/'\n",
    "\n",
    "    df_yellow = spark.read \\\n",
    "        .option(\"header\", \"true\") \\\n",
    "        .schema(yellow_schema) \\\n",
    "        .csv(input_path)\n",
    "\n",
    "    df_yellow \\\n",
    "        .repartition(4) \\\n",
    "        .write.parquet(output_path)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "d7eb0da9",
   "metadata": {},
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3 (ipykernel)",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.9.7"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 5
}
